home *** CD-ROM | disk | FTP | other *** search
- {*********************************************************}
- {* AAThdNCp *}
- {* Copyright (c) Julian M Bucknall 1998-2000 *}
- {* All rights reserved. *}
- {*********************************************************}
- {* Algorithms Alfresco: multithreaded multibuffered copy *}
- {*********************************************************}
-
- {Note: this unit is released as freeware. In other words, you are free
- to use this unit in your own applications, however I retain all
- copyright to the code. JMB}
-
- unit AAThdNCp;
-
- interface
-
- uses
- SysUtils, Windows, Classes;
-
- const
- aac_MaxConsumers = 32;
-
- type
- PaaStreamArray = ^TaaStreamArray;
- TaaStreamArray = array [0..pred(aac_MaxConsumers)] of TStream;
-
- procedure AAThreadedMultiCopyStream(aSrcStream : TStream;
- aDestCount : integer;
- aDestStreams : PaaStreamArray);
-
- implementation
-
- const
- BufferSize = 1024;
-
- type
- PBuffer = ^TBuffer;
- TBuffer = packed record
- bToReadCount : integer;
- bCount : longint;
- bBlock : array [0..pred(BufferSize)] of byte;
- end;
-
- PBufferArray = ^TBufferArray;
- TBufferArray = array [0..1023] of PBuffer;
-
- TQueuedBuffers = class
- private
- FBufCount : integer;
- FBuffers : PBufferArray;
- FConsumerCount : integer;
- FHead : array [0..pred(aac_MaxConsumers)] of integer;
- FIsNotEmpty : array [0..pred(aac_MaxConsumers)] of THandle;
- {semaphores}
- FIsNotFull : THandle; {semaphore}
- FTail : integer;
- protected
- function qbGetHead(aInx : integer) : PBuffer;
- function qbGetIsNotEmpty(aInx : integer) : THandle;
- function qbGetTail : PBuffer;
- public
- constructor Create(aBufferCount : integer;
- aConsumerCount : integer);
- destructor Destroy; override;
-
- procedure AdvanceHead(aConsumerId : integer);
- procedure AdvanceTail;
-
- property Head[aInx : integer] : PBuffer read qbGetHead;
- property Tail : PBuffer read qbGetTail;
-
- property IsNotEmpty[aInx : integer] : THandle
- read qbGetIsNotEmpty;
- property IsNotFull : THandle read FIsNotFull;
-
- property ConsumerCount : integer read FConsumerCount;
- end;
-
- type
- TProducer = class(TThread)
- private
- FStream : TStream;
- FBuffers : TQueuedBuffers;
- protected
- procedure Execute; override;
- public
- constructor Create(aStream : TStream;
- aBuffers : TQueuedBuffers);
- destructor Destroy; override;
- end;
-
- type
- TConsumer = class(TThread)
- private
- FStream : TStream;
- FBuffers : TQueuedBuffers;
- FID : integer;
- protected
- procedure Execute; override;
- public
- constructor Create(aStream : TStream;
- aBuffers : TQueuedBuffers;
- aID : integer);
- destructor Destroy; override;
- end;
-
- {===TQueuedBuffers===================================================}
- constructor TQueuedBuffers.Create(aBufferCount : integer;
- aConsumerCount : integer);
- var
- i : integer;
- begin
- inherited Create;
- {allocate the buffers}
- FBuffers := AllocMem(aBufferCount * sizeof(pointer));
- for i := 0 to pred(aBufferCount) do
- GetMem(FBuffers^[i], sizeof(TBuffer));
- FBufCount := aBufferCount;
- {create the semaphores}
- FConsumerCount := aConsumerCount;
- FIsNotFull := CreateSemaphore(nil, aBufferCount, aBufferCount, '');
- for i := 0 to pred(aConsumerCount) do
- FIsNotEmpty[i] := CreateSemaphore(nil, 0, aBufferCount, '');
- end;
- {--------}
- destructor TQueuedBuffers.Destroy;
- var
- i : integer;
- begin
- {destroy the semaphores}
- if (FIsNotFull <> 0) then
- CloseHandle(FIsNotFull);
- for i := 0 to pred(ConsumerCount) do
- if (FIsNotEmpty[i] <> 0) then
- CloseHandle(FIsNotEmpty[i]);
- {free the buffers}
- if (FBuffers <> nil) then begin
- for i := 0 to pred(FBufCount) do
- if (FBuffers^[i] <> nil) then
- FreeMem(FBuffers^[i], sizeof(TBuffer));
- FreeMem(FBuffers, FBufCount * sizeof(pointer));
- end;
- inherited Destroy;
- end;
- {--------}
- procedure TQueuedBuffers.AdvanceHead(aConsumerId : integer);
- begin
- inc(FHead[aConsumerId]);
- if (FHead[aConsumerId] = FBufCount) then
- FHead[aConsumerId] := 0;
- end;
- {--------}
- procedure TQueuedBuffers.AdvanceTail;
- begin
- inc(FTail);
- if (FTail = FBufCount) then
- FTail := 0;
- end;
- {--------}
- function TQueuedBuffers.qbGetHead(aInx : integer) : PBuffer;
- begin
- Result := FBuffers^[FHead[aInx]];
- end;
- {--------}
- function TQueuedBuffers.qbGetIsNotEmpty(aInx : integer) : THandle;
- begin
- Result := FIsNotEmpty[aInx];
- end;
- {--------}
- function TQueuedBuffers.qbGetTail : PBuffer;
- begin
- Result := FBuffers^[FTail];
- end;
- {====================================================================}
-
-
- {===TProducer========================================================}
- constructor TProducer.Create(aStream : TStream;
- aBuffers : TQueuedBuffers);
- begin
- inherited Create(true);
- FStream := aStream;
- FBuffers := aBuffers;
- end;
- {--------}
- destructor TProducer.Destroy;
- begin
- inherited Destroy;
- end;
- {--------}
- procedure TProducer.Execute;
- var
- Tail : PBuffer;
- i : integer;
- begin
- {do until the stream is exhausted...}
- repeat
- {get the 'queue is not full' semaphore}
- WaitForSingleObject(FBuffers.IsNotFull, INFINITE);
- {read a block from the stream into the head buffer}
- Tail := FBuffers.Tail;
- Tail^.bCount := FStream.Read(Tail^.bBlock, BufferSize);
- Tail^.bToReadCount := FBuffers.ConsumerCount;
- {advance the hail pointer}
- FBuffers.AdvanceTail;
- {as we've written a new buffer, signal all the 'queue is not
- empty' semaphores}
- for i := 0 to pred(FBuffers.ConsumerCount) do
- ReleaseSemaphore(FBuffers.IsNotEmpty[i], 1, nil);
- until (Tail^.bCount = 0);
- end;
- {====================================================================}
-
-
- {===TConsumer========================================================}
- constructor TConsumer.Create(aStream : TStream;
- aBuffers : TQueuedBuffers;
- aID : integer);
- begin
- inherited Create(true);
- FStream := aStream;
- FBuffers := aBuffers;
- FID := aID;
- end;
- {--------}
- destructor TConsumer.Destroy;
- begin
- inherited Destroy;
- end;
- {--------}
- procedure TConsumer.Execute;
- var
- Head : PBuffer;
- NumToRead : integer;
- begin
- {get our 'queue is not empty' semaphore}
- WaitForSingleObject(FBuffers.IsNotEmpty[FID], INFINITE);
- {get the head buffer}
- Head := FBuffers.Head[FID];
- {while the head buffer is not empty...}
- while (Head^.bCount <> 0) do begin
- {write a block from the head buffer into the stream}
- FStream.Write(Head^.bBlock, Head^.bCount);
- {we've finished with this buffer, so safely decrement the count of
- consumers who have still to read this buffer}
- NumToRead := InterlockedDecrement(Head^.bToReadCount);
- {advance our head pointer}
- FBuffers.AdvanceHead(FID);
- {if we were the last consumer to read this buffer...}
- if (NumToRead = 0) then
- {signal the 'queue is not full' semaphore}
- ReleaseSemaphore(FBuffers.IsNotFull, 1, nil);
- {get our 'queue is not empty' semaphore}
- WaitForSingleObject(FBuffers.IsNotEmpty[FID], INFINITE);
- {get the head buffer}
- Head := FBuffers.Head[FID];
- end;
- end;
- {====================================================================}
-
-
- {===Interfaced routine===============================================}
- procedure AAThreadedMultiCopyStream(aSrcStream : TStream;
- aDestCount : integer;
- aDestStreams : PaaStreamArray);
- var
- i : integer;
- Buffers : TQueuedBuffers;
- Producer : TProducer;
- Consumer : array [0..pred(aac_MaxConsumers)] of TConsumer;
- WaitArray : array [0..aac_MaxConsumers] of THandle;
- begin
- Buffers := nil;
- Producer := nil;
- for i := 0 to pred(aac_MaxConsumers) do
- Consumer[i] := nil;
- for i := 0 to aac_MaxConsumers do
- WaitArray[i] := 0;
- try
- {create the queued buffer object}
- Buffers := TQueuedBuffers.Create(20, aDestCount);
- {create the producer thread, save its handle}
- Producer := TProducer.Create(aSrcStream, Buffers);
- WaitArray[0] := Producer.Handle;
- {create the consumer threads, save their handles}
- for i := 0 to pred(aDestCount) do begin
- Consumer[i] := TConsumer.Create(aDestStreams^[i], Buffers, i);
- WaitArray[i+1] := Consumer[i].Handle;
- end;
- {start the threads up}
- for i := 0 to pred(aDestCount) do
- Consumer[i].Resume;
- Producer.Resume;
- {wait for the threads to finish}
- WaitForMultipleObjects(1+aDestCount, @WaitArray, true, INFINITE);
- finally
- Producer.Free;
- for i := 0 to pred(aDestCount) do
- Consumer[i].Free;
- Buffers.Free;
- end;
- end;
- {====================================================================}
-
- end.
-